package com.atguigu.flink.wordcount;

import com.atguigu.flink.function.MyWordCountFlatmapFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Created by Smexy on 2023/11/7
 *
 *  流式API： DataStream API  对比 批处理API DataSet API
 *      区别： 1.环境不同
 *            2.返回的数据模型不同
 *            3.分组方法不同
 *            4.流失编程，需要在最后提交Job
 *
 *            批处理： 把一批数据，经过一个算子全部处理完之后，再发送到下一个算子去处理。
 *                      粒度： 批
 *            流处理： 一批数据的每一条数据，经过一个算子全部处理完之后，发送到下一个算子去处理
                        粒度： 一条

 */
public class Demo2_BoundedStreamDemo
{
    public static void main(String[] args) throws Exception {
        //1.创建环境
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.读取数据，封装为数据模型
        // FileSource: 定义了数据在哪里
        FileSource<String> source = FileSource.forRecordStreamFormat(
                                                 new TextLineInputFormat(), new Path("data/words.txt"))
                                             .build();
        // 去读
        DataStreamSource<String> dataStreamSource = environment.fromSource(source, WatermarkStrategy.noWatermarks(), "wc");
        //3.读取文件中每一行的内容，切分为N个(单词,1)

        dataStreamSource.
            flatMap(new MyWordCountFlatmapFunction())
            //由于数据是tuple所以，需要使用groupBy(int... fields)
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>()
            {
                @Override
                public String getKey(Tuple2<String, Integer> value) throws Exception {
                    return value.f0;
                }
            })
            //对Tuple2的第二列进行累积
            .sum(1)
            .print();

        //提交job，触发运行
        environment.execute();



    }
}
